/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package com.allyes.flume.source;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.PollableSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;

public class AllyesTaildirSource extends AbstractSource
implements PollableSource, Configurable {

  private static final Logger logger = LoggerFactory.getLogger(AllyesTaildirSource.class);

  private Map<String, String> filePaths;
  private Table<String, String, String> headerTable;
  private int batchSize;
  private String positionFilePath;
  private HashSet<String> dynamicHeaders = new HashSet<String>();

  private SourceCounter sourceCounter;
  private ReliableTaildirEventReader reader;
  private ScheduledExecutorService idleFileChecker;
  private ScheduledExecutorService positionWriter;
  private int retryInterval = 1000;
  private final int maxRetryInterval = 5000;
  private int idleTimeout;
  private final int checkIdleInterval = 30000;
  private final int writePosInitDelay = 5000;
  private int writePosInterval;

  private final List<Long> existingInodes = new CopyOnWriteArrayList<Long>();
  private final List<Long> idleInodes = new CopyOnWriteArrayList<Long>();
  private Long backoffSleepIncrement;
  private Long maxBackOffSleepInterval;

  @Override
  public synchronized void start() {
    logger.info("{} AllyesTaildirSource source starting with directory: {}", getName(), filePaths);
    try {
      reader = new ReliableTaildirEventReader.Builder(
          filePaths,
          headerTable,
          positionFilePath,
          dynamicHeaders).build();

    } catch (IOException e) {
      throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
    }

    idleFileChecker = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
    idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(),
        idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS);

    positionWriter = Executors.newSingleThreadScheduledExecutor(
        new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
    positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(),
        writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS);

    super.start();
    logger.debug("AllyesTaildirSource started");
    sourceCounter.start();
  }

  @Override
  public synchronized void stop() {
    try {
      super.stop();
      ExecutorService[] services = {idleFileChecker, positionWriter};
      for (ExecutorService service : services) {
        service.shutdown();
        if (!service.awaitTermination(1, TimeUnit.SECONDS)) {
          service.shutdownNow();
        }
      }
      // write the last position
      writePosition();
      reader.close();
    } catch (InterruptedException e) {
      logger.info("Interrupted while awaiting termination", e);
    } catch (IOException e) {
      logger.info("Failed: " + e.getMessage(), e);
    }
    sourceCounter.stop();
    logger.info("AllyesTaildir source {} stopped. Metrics: {}", getName(), sourceCounter);
  }

  @Override
  public String toString() {
    return String.format("AllyesTaildir source: { positionFile: %s, " +
        "idleTimeout: %s, writePosInterval: %s }",
        positionFilePath, idleTimeout, writePosInterval);
  }

  @Override
  public synchronized void configure(Context context) {
    String fileGroups = context.getString(TaildirSourceConfigurationConstants.FILE_GROUPS);
    Preconditions.checkState(fileGroups != null, "Missing param: " + TaildirSourceConfigurationConstants.FILE_GROUPS);

    filePaths = selectByKeys(context.getSubProperties(TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX), fileGroups.split("\\s+"));
    Preconditions.checkState(!filePaths.isEmpty(),
        "Mapping for tailing files is empty or invalid: '" + TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX + "'");

    String homePath = System.getProperty("user.home").replace('\\', '/');
    positionFilePath = context.getString(TaildirSourceConfigurationConstants.POSITION_FILE, homePath + TaildirSourceConfigurationConstants.DEFAULT_POSITION_FILE);

    Path positionFile = Paths.get(positionFilePath);
    try {
      Files.createDirectories(positionFile.getParent());
    } catch (IOException e) {
      throw new FlumeException("Error creating positionFile parent directories", e);
    }

    headerTable = getTable(context, TaildirSourceConfigurationConstants.HEADERS_PREFIX);
    batchSize = context.getInteger(TaildirSourceConfigurationConstants.BATCH_SIZE, TaildirSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
    idleTimeout = context.getInteger(TaildirSourceConfigurationConstants.IDLE_TIMEOUT, TaildirSourceConfigurationConstants.DEFAULT_IDLE_TIMEOUT);
    writePosInterval = context.getInteger(TaildirSourceConfigurationConstants.WRITE_POS_INTERVAL, TaildirSourceConfigurationConstants.DEFAULT_WRITE_POS_INTERVAL);

    dynamicHeaders.clear();
    String dynamicHeadersStr = context.getString(TaildirSourceConfigurationConstants.TXT_DYNAMIC_HEADERS);
    if (dynamicHeadersStr != null) {
      for (String h: dynamicHeadersStr.split("\\s+")) {
        if (TaildirSourceConfigurationConstants.allDynamicHeaders.contains(h)) {
          dynamicHeaders.add(h);
        } else {
          Preconditions.checkState(false, "Invalid dynamic header: " + h);
        }
      }
    }

    backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT
        , PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
    maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP
        , PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);

    if (sourceCounter == null) {
      sourceCounter = new SourceCounter(getName());
    }
  }

  private Map<String, String> selectByKeys(Map<String, String> map, String[] keys) {
    Map<String, String> result = Maps.newHashMap();
    for (String key : keys) {
      if (map.containsKey(key)) {
        result.put(key, map.get(key));
      }
    }
    return result;
  }

  private Table<String, String, String> getTable(Context context, String prefix) {
    Table<String, String, String> table = HashBasedTable.create();
    for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) {
      String[] parts = e.getKey().split("\\.", 2);
      table.put(parts[0], parts[1], e.getValue());
    }
    return table;
  }

  @VisibleForTesting
  protected SourceCounter getSourceCounter() {
    return sourceCounter;
  }

  @Override
  public Status process() {
    Status status = Status.READY;

    try {
      existingInodes.clear();
      existingInodes.addAll(reader.updateTailFiles());

      for (Long inode : existingInodes) {
        TailFile tf = reader.getTailFiles(false).get(inode);
        if (tf != null && tf.needTail()) {
          tailFileProcess(tf, true);
        }
      }

      handleIdleFiles();

      try {
        TimeUnit.MILLISECONDS.sleep(retryInterval);
      } catch (InterruptedException e) {
        logger.info("Interrupted while sleeping");
      }

    } catch (Throwable t) {
      logger.error("Unable to tail files", t);
      status = Status.BACKOFF;
    }

    return status;
  }

  @Override
  public long getBackOffSleepIncrement() {
    return backoffSleepIncrement;
  }

  @Override
  public long getMaxBackOffSleepInterval() {
    return maxBackOffSleepInterval;
  }

  private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
      throws IOException, InterruptedException {
    while (true) {
      reader.setCurrentFile(tf);
      List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);

      sourceCounter.addToEventReceivedCount(events.size());
      sourceCounter.incrementAppendBatchReceivedCount();

      try {
        if (events.size() > 0) {
          getChannelProcessor().processEventBatch(events);
        }
        reader.commit();
      } catch (ChannelException ex) {
        logger.warn("The channel is full or unexpected failure. " +
            "The source will try again after " + retryInterval + " ms");
        TimeUnit.MILLISECONDS.sleep(retryInterval);
        retryInterval = retryInterval << 1;
        retryInterval = Math.min(retryInterval, maxRetryInterval);
        continue;
      }

      retryInterval = 1000;
      sourceCounter.addToEventAcceptedCount(events.size());
      sourceCounter.incrementAppendBatchAcceptedCount();

      if (events.size() < batchSize) {
        break;
      }
    }
  }

  private void handleIdleFiles() throws IOException, InterruptedException {
    Map<Long, TailFile> tfs = reader.getTailFiles(true);

    for (long inode : idleInodes) {
      TailFile tf = tfs.get(inode);

      if (null == tf) {
        continue;
      }

      if (tf.getRaf() != null) {  // when file has not been closed yet
        tailFileProcess(tf, false);
        tf.close();
        logger.info("Temporarily close idle file: " + tf.getPath() +
            ", inode: " + inode +
            ", pos and line: " + tf.getCommitedPosAndLineIndex());
      }

      // remove from memory if the file not existed
      File f = new File(tf.getPath());
      if (!f.exists()) {
        reader.getTailFiles(false).remove(inode);
        logger.info("Remove absent file from memory: " + tf.getPath() +
            ", inode: " + inode +
            ", pos and line: " + tf.getCommitedPosAndLineIndex());
      }
    }

    idleInodes.clear();
  }

  /**
   * Runnable class that checks whether there are files which should be closed.
   */
  private class idleFileCheckerRunnable implements Runnable {
    @Override
    public void run() {
      try {
        Map<Long, TailFile> tfs = reader.getTailFiles(true);
        long now = System.currentTimeMillis();

        for (TailFile tf : tfs.values()) {
          if (!tf.needTail() &&
              tf.getLastUpdated() + idleTimeout < now) {
            idleInodes.add(tf.getInode());
          }
        }

      } catch (Throwable t) {
        logger.error("Uncaught exception in IdleFileChecker thread", t);
      }
    }
  }

  /**
   * Runnable class that writes a position file which has the last read position
   * of each file.
   */
  private class PositionWriterRunnable implements Runnable {
    @Override
    public void run() {
      writePosition();
    }
  }

  private void writePosition() {
    String tmpFilePath = positionFilePath + ".tmp";
    File tmpFile = new File(tmpFilePath);
    FileWriter writer = null;

    try {
      String json = toPosInfoJson();
      if (json != null) {
        writer = new FileWriter(tmpFile);
        writer.write(json);
        writer.close();
        writer = null;
        Files.move(Paths.get(tmpFilePath), Paths.get(positionFilePath),
            java.nio.file.StandardCopyOption.REPLACE_EXISTING);
        //logger.info("Succeed to update position file: " + positionFilePath);
      }
    } catch (Throwable t) {
      logger.error("Failed to update position file: " + positionFilePath, t);
    } finally {
      try {
        if (writer != null) {
          writer.close();
          writer = null;
        }
      } catch (IOException e) {
        logger.error("Error: " + e.getMessage(), e);
      }
    }
  }

  private String toPosInfoJson() {
    @SuppressWarnings("rawtypes")
    List<Map> posInfos = Lists.newArrayList();

    Map<Long, TailFile> tfs = reader.getTailFiles(true);

    for (Long inode : existingInodes) {
      TailFile tf = tfs.get(inode);
      if (tf != null) {
        Pair<Long, Integer> pl = tf.getCommitedPosAndLineIndex();
        posInfos.add(ImmutableMap.of(
            TaildirSourceConfigurationConstants.TXT_INODE,      inode,
            TaildirSourceConfigurationConstants.TXT_POS,        pl.left,
            TaildirSourceConfigurationConstants.TXT_LINE_INDEX, pl.right,
            TaildirSourceConfigurationConstants.TXT_FILE,       tf.getPath()));
      }
    }

    if (posInfos.size() > 0) {
      return new Gson().toJson(posInfos);
    } else {
      return null;
    }
  }
}
